package cn.tiakon.dmp.report

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.hadoop.fs._
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 将 spark sql 处理的结果以 json 的格式写到磁盘中
  *
  * @author Tiakon
  *         2018/3/28 18:03
  */
object Sql2Parquet {

  def main(args: Array[String]): Unit = {

    val load: Config = ConfigFactory.load()

    val sparkConf = new SparkConf().setAppName("sql转json存到磁盘")
      .setMaster("local[*]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // RDD

    val sc = new SparkContext(sparkConf)
    val sqlc = new SQLContext(sc)

    sqlc.read.parquet(load.getString("output.parquet.path")).registerTempTable("logs")

    //读取 parquet 文件，注册成临时表
    val counted: DataFrame = sqlc.sql("select count(*),provincename,cityname from logs group by provincename , cityname")

    //    读取本机core-site.xml配置文件，根据指向的文件系统删除文件
    val fs: FileSystem = FileSystem.get(sc.hadoopConfiguration)

    val path: Path = new Path("D:\\dmp-testdata\\output\\json-sql")
    if (fs.exists(path)) {
      fs.delete(path, true)
    }

    //    分区
    //    counted.repartition(2).write.partitionBy("provincename", "cityname").json("D:\\json")

    //    合并分区
    counted.repartition(2).write.json("D:\\dmp-testdata\\output\\json-sql")

    //        counted.write.jdbc()
    sc.stop()
  }

}
